/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

import junit.framework.Test;

import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * BrokerBenchmark is used to get an idea of the raw performance of a broker.
 * Since the broker data structures using in message dispatching are under high
 * contention from client requests, it's performance should be monitored closely
 * since it typically is the biggest bottleneck in a high performance messaging
 * fabric. The benchmarks are run under all the following combinations options:
 * Queue vs. Topic, 1 vs. 10 producer threads, 1 vs. 10 consumer threads, and
 * Persistent vs. Non-Persistent messages. Message Acking uses client ack style
 * batch acking since that typically has the best ack performance.
 */
public class BrokerBenchmark extends BrokerTestSupport {

   private static final transient Logger LOG = LoggerFactory.getLogger(BrokerBenchmark.class);

   public int produceCount = Integer.parseInt(System.getProperty("PRODUCE_COUNT", "10000"));
   public ActiveMQDestination destination;
   public int prodcuerCount;
   public int consumerCount;
   public boolean deliveryMode;

   public void initCombosForTestPerformance() {
      addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")});
      addCombinationValues("PRODUCER_COUNT", new Object[]{new Integer("1"), new Integer("10")});
      addCombinationValues("CONSUMER_COUNT", new Object[]{new Integer("1"), new Integer("10")});
      addCombinationValues("CONSUMER_COUNT", new Object[]{new Integer("1"), new Integer("10")});
      addCombinationValues("deliveryMode", new Object[]{Boolean.TRUE});
   }

   public void testPerformance() throws Exception {

      LOG.info("Running Benchmark for destination=" + destination + ", producers=" + prodcuerCount + ", consumers=" + consumerCount + ", deliveryMode=" + deliveryMode);
      final int consumeCount = destination.isTopic() ? consumerCount * produceCount : produceCount;

      final Semaphore consumersStarted = new Semaphore(1 - consumerCount);
      final Semaphore producersFinished = new Semaphore(1 - prodcuerCount);
      final Semaphore consumersFinished = new Semaphore(1 - consumerCount);
      final ProgressPrinter printer = new ProgressPrinter(produceCount + consumeCount, 10);

      // Start a producer and consumer

      profilerPause("Benchmark ready.  Start profiler ");

      long start = System.currentTimeMillis();

      final AtomicInteger receiveCounter = new AtomicInteger(0);
      for (int i = 0; i < consumerCount; i++) {
         new Thread() {
            @Override
            public void run() {
               try {

                  // Consume the messages
                  StubConnection connection = new StubConnection(broker);
                  ConnectionInfo connectionInfo = createConnectionInfo();
                  connection.send(connectionInfo);

                  SessionInfo sessionInfo = createSessionInfo(connectionInfo);
                  ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
                  consumerInfo.setPrefetchSize(1000);
                  connection.send(sessionInfo);
                  connection.send(consumerInfo);

                  consumersStarted.release();

                  while (receiveCounter.get() < consumeCount) {

                     int counter = 0;
                     // Get a least 1 message.
                     Message msg = receiveMessage(connection, 2000);
                     if (msg != null) {
                        printer.increment();
                        receiveCounter.incrementAndGet();

                        counter++;

                        // Try to piggy back a few extra message acks if
                        // they are ready.
                        Message extra = null;
                        while ((extra = receiveMessage(connection, 0)) != null) {
                           msg = extra;
                           printer.increment();
                           receiveCounter.incrementAndGet();
                           counter++;
                        }
                     }

                     if (msg != null) {
                        connection.send(createAck(consumerInfo, msg, counter, MessageAck.STANDARD_ACK_TYPE));
                     } else if (receiveCounter.get() < consumeCount) {
                        LOG.info("Consumer stall, waiting for message #" + receiveCounter.get() + 1);
                     }
                  }

                  connection.send(closeConsumerInfo(consumerInfo));
               } catch (Throwable e) {
                  e.printStackTrace();
               } finally {
                  consumersFinished.release();
               }
            }

         }.start();
      }

      // Make sure that the consumers are started first to avoid sending
      // messages
      // before a topic is subscribed so that those messages are not missed.
      consumersStarted.acquire();

      // Send the messages in an async thread.
      for (int i = 0; i < prodcuerCount; i++) {
         new Thread() {
            @Override
            public void run() {
               try {
                  StubConnection connection = new StubConnection(broker);
                  ConnectionInfo connectionInfo = createConnectionInfo();
                  connection.send(connectionInfo);

                  SessionInfo sessionInfo = createSessionInfo(connectionInfo);
                  ProducerInfo producerInfo = createProducerInfo(sessionInfo);
                  connection.send(sessionInfo);
                  connection.send(producerInfo);

                  for (int i = 0; i < produceCount / prodcuerCount; i++) {
                     Message message = createMessage(producerInfo, destination);
                     message.setPersistent(deliveryMode);
                     message.setResponseRequired(false);
                     connection.send(message);
                     printer.increment();
                  }
               } catch (Throwable e) {
                  e.printStackTrace();
               } finally {
                  producersFinished.release();
               }
            }
         }.start();
      }

      producersFinished.acquire();
      long end1 = System.currentTimeMillis();
      consumersFinished.acquire();
      long end2 = System.currentTimeMillis();

      LOG.info("Results for destination=" + destination + ", producers=" + prodcuerCount + ", consumers=" + consumerCount + ", deliveryMode=" + deliveryMode);
      LOG.info("Produced at messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
      LOG.info("Consumed at messages/sec: " + (consumeCount * 1000.0 / (end2 - start)));
      profilerPause("Benchmark done.  Stop profiler ");
   }

   public static Test suite() {
      return suite(BrokerBenchmark.class);
   }

   public static void main(String[] args) {
      junit.textui.TestRunner.run(suite());
   }

}
